home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyc (Python 2.4)
-
- import unittest
- from test import test_support
- import subprocess
- import sys
- import signal
- import os
- import tempfile
- import time
- import re
- mswindows = sys.platform == 'win32'
- if mswindows:
- SETBINARY = 'import msvcrt; msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY);'
- else:
- SETBINARY = ''
-
- def remove_stderr_debug_decorations(stderr):
- return re.sub('\\[\\d+ refs\\]\\r?\\n?$', '', stderr)
-
-
- class ProcessTestCase(unittest.TestCase):
-
- def mkstemp(self):
- '''wrapper for mkstemp, calling mktemp if mkstemp is not available'''
- if hasattr(tempfile, 'mkstemp'):
- return tempfile.mkstemp()
- else:
- fname = tempfile.mktemp()
- return (os.open(fname, os.O_RDWR | os.O_CREAT), fname)
-
-
- def test_call_seq(self):
- rc = subprocess.call([
- sys.executable,
- '-c',
- 'import sys; sys.exit(47)'])
- self.assertEqual(rc, 47)
-
-
- def test_call_kwargs(self):
- newenv = os.environ.copy()
- newenv['FRUIT'] = 'banana'
- rc = subprocess.call([
- sys.executable,
- '-c',
- 'import sys, os;sys.exit(os.getenv("FRUIT")=="banana")'], env = newenv)
- self.assertEqual(rc, 1)
-
-
- def test_stdin_none(self):
- p = subprocess.Popen([
- sys.executable,
- '-c',
- 'print "banana"'], stdout = subprocess.PIPE, stderr = subprocess.PIPE)
- p.wait()
- self.assertEqual(p.stdin, None)
-
-
- def test_stdout_none(self):
- p = subprocess.Popen([
- sys.executable,
- '-c',
- 'print " this bit of output is from a test of stdout in a different process ..."'], stdin = subprocess.PIPE, stderr = subprocess.PIPE)
- p.wait()
- self.assertEqual(p.stdout, None)
-
-
- def test_stderr_none(self):
- p = subprocess.Popen([
- sys.executable,
- '-c',
- 'print "banana"'], stdin = subprocess.PIPE, stdout = subprocess.PIPE)
- p.wait()
- self.assertEqual(p.stderr, None)
-
-
- def test_executable(self):
- p = subprocess.Popen([
- 'somethingyoudonthave',
- '-c',
- 'import sys; sys.exit(47)'], executable = sys.executable)
- p.wait()
- self.assertEqual(p.returncode, 47)
-
-
- def test_stdin_pipe(self):
- p = subprocess.Popen([
- sys.executable,
- '-c',
- 'import sys; sys.exit(sys.stdin.read() == "pear")'], stdin = subprocess.PIPE)
- p.stdin.write('pear')
- p.stdin.close()
- p.wait()
- self.assertEqual(p.returncode, 1)
-
-
- def test_stdin_filedes(self):
- tf = tempfile.TemporaryFile()
- d = tf.fileno()
- os.write(d, 'pear')
- os.lseek(d, 0, 0)
- p = subprocess.Popen([
- sys.executable,
- '-c',
- 'import sys; sys.exit(sys.stdin.read() == "pear")'], stdin = d)
- p.wait()
- self.assertEqual(p.returncode, 1)
-
-
- def test_stdin_fileobj(self):
- tf = tempfile.TemporaryFile()
- tf.write('pear')
- tf.seek(0)
- p = subprocess.Popen([
- sys.executable,
- '-c',
- 'import sys; sys.exit(sys.stdin.read() == "pear")'], stdin = tf)
- p.wait()
- self.assertEqual(p.returncode, 1)
-
-
- def test_stdout_pipe(self):
- p = subprocess.Popen([
- sys.executable,
- '-c',
- 'import sys; sys.stdout.write("orange")'], stdout = subprocess.PIPE)
- self.assertEqual(p.stdout.read(), 'orange')
-
-
- def test_stdout_filedes(self):
- tf = tempfile.TemporaryFile()
- d = tf.fileno()
- p = subprocess.Popen([
- sys.executable,
- '-c',
- 'import sys; sys.stdout.write("orange")'], stdout = d)
- p.wait()
- os.lseek(d, 0, 0)
- self.assertEqual(os.read(d, 1024), 'orange')
-
-
- def test_stdout_fileobj(self):
- tf = tempfile.TemporaryFile()
- p = subprocess.Popen([
- sys.executable,
- '-c',
- 'import sys; sys.stdout.write("orange")'], stdout = tf)
- p.wait()
- tf.seek(0)
- self.assertEqual(tf.read(), 'orange')
-
-
- def test_stderr_pipe(self):
- p = subprocess.Popen([
- sys.executable,
- '-c',
- 'import sys; sys.stderr.write("strawberry")'], stderr = subprocess.PIPE)
- self.assertEqual(remove_stderr_debug_decorations(p.stderr.read()), 'strawberry')
-
-
- def test_stderr_filedes(self):
- tf = tempfile.TemporaryFile()
- d = tf.fileno()
- p = subprocess.Popen([
- sys.executable,
- '-c',
- 'import sys; sys.stderr.write("strawberry")'], stderr = d)
- p.wait()
- os.lseek(d, 0, 0)
- self.assertEqual(remove_stderr_debug_decorations(os.read(d, 1024)), 'strawberry')
-
-
- def test_stderr_fileobj(self):
- tf = tempfile.TemporaryFile()
- p = subprocess.Popen([
- sys.executable,
- '-c',
- 'import sys; sys.stderr.write("strawberry")'], stderr = tf)
- p.wait()
- tf.seek(0)
- self.assertEqual(remove_stderr_debug_decorations(tf.read()), 'strawberry')
-
-
- def test_stdout_stderr_pipe(self):
- p = subprocess.Popen([
- sys.executable,
- '-c',
- 'import sys;sys.stdout.write("apple");sys.stdout.flush();sys.stderr.write("orange")'], stdout = subprocess.PIPE, stderr = subprocess.STDOUT)
- output = p.stdout.read()
- stripped = remove_stderr_debug_decorations(output)
- self.assertEqual(stripped, 'appleorange')
-
-
- def test_stdout_stderr_file(self):
- tf = tempfile.TemporaryFile()
- p = subprocess.Popen([
- sys.executable,
- '-c',
- 'import sys;sys.stdout.write("apple");sys.stdout.flush();sys.stderr.write("orange")'], stdout = tf, stderr = tf)
- p.wait()
- tf.seek(0)
- output = tf.read()
- stripped = remove_stderr_debug_decorations(output)
- self.assertEqual(stripped, 'appleorange')
-
-
- def test_cwd(self):
- tmpdir = os.getenv('TEMP', '/tmp')
- cwd = os.getcwd()
- os.chdir(tmpdir)
- tmpdir = os.getcwd()
- os.chdir(cwd)
- p = subprocess.Popen([
- sys.executable,
- '-c',
- 'import sys,os;sys.stdout.write(os.getcwd())'], stdout = subprocess.PIPE, cwd = tmpdir)
- normcase = os.path.normcase
- self.assertEqual(normcase(p.stdout.read()), normcase(tmpdir))
-
-
- def test_env(self):
- newenv = os.environ.copy()
- newenv['FRUIT'] = 'orange'
- p = subprocess.Popen([
- sys.executable,
- '-c',
- 'import sys,os;sys.stdout.write(os.getenv("FRUIT"))'], stdout = subprocess.PIPE, env = newenv)
- self.assertEqual(p.stdout.read(), 'orange')
-
-
- def test_communicate(self):
- p = subprocess.Popen([
- sys.executable,
- '-c',
- 'import sys,os;sys.stderr.write("pineapple");sys.stdout.write(sys.stdin.read())'], stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
- (stdout, stderr) = p.communicate('banana')
- self.assertEqual(stdout, 'banana')
- self.assertEqual(remove_stderr_debug_decorations(stderr), 'pineapple')
-
-
- def test_communicate_returns(self):
- p = subprocess.Popen([
- sys.executable,
- '-c',
- 'import sys; sys.exit(47)'])
- (stdout, stderr) = p.communicate()
- self.assertEqual(stdout, None)
- self.assertEqual(stderr, None)
-
-
- def test_communicate_pipe_buf(self):
- (x, y) = os.pipe()
- if mswindows:
- pipe_buf = 512
- else:
- pipe_buf = os.fpathconf(x, 'PC_PIPE_BUF')
- os.close(x)
- os.close(y)
- p = subprocess.Popen([
- sys.executable,
- '-c',
- 'import sys,os;sys.stdout.write(sys.stdin.read(47));sys.stderr.write("xyz"*%d);sys.stdout.write(sys.stdin.read())' % pipe_buf], stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
- string_to_write = 'abc' * pipe_buf
- (stdout, stderr) = p.communicate(string_to_write)
- self.assertEqual(stdout, string_to_write)
-
-
- def test_writes_before_communicate(self):
- p = subprocess.Popen([
- sys.executable,
- '-c',
- 'import sys,os;sys.stdout.write(sys.stdin.read())'], stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
- p.stdin.write('banana')
- (stdout, stderr) = p.communicate('split')
- self.assertEqual(stdout, 'bananasplit')
- self.assertEqual(remove_stderr_debug_decorations(stderr), '')
-
-
- def test_universal_newlines(self):
- p = subprocess.Popen([
- sys.executable,
- '-c',
- 'import sys,os;' + SETBINARY + 'sys.stdout.write("line1\\n");sys.stdout.flush();sys.stdout.write("line2\\r");sys.stdout.flush();sys.stdout.write("line3\\r\\n");sys.stdout.flush();sys.stdout.write("line4\\r");sys.stdout.flush();sys.stdout.write("\\nline5");sys.stdout.flush();sys.stdout.write("\\nline6");'], stdout = subprocess.PIPE, universal_newlines = 1)
- stdout = p.stdout.read()
- if hasattr(open, 'newlines'):
- self.assertEqual(stdout, 'line1\nline2\nline3\nline4\nline5\nline6')
- else:
- self.assertEqual(stdout, 'line1\nline2\rline3\r\nline4\r\nline5\nline6')
-
-
- def test_universal_newlines_communicate(self):
- p = subprocess.Popen([
- sys.executable,
- '-c',
- 'import sys,os;' + SETBINARY + 'sys.stdout.write("line1\\n");sys.stdout.flush();sys.stdout.write("line2\\r");sys.stdout.flush();sys.stdout.write("line3\\r\\n");sys.stdout.flush();sys.stdout.write("line4\\r");sys.stdout.flush();sys.stdout.write("\\nline5");sys.stdout.flush();sys.stdout.write("\\nline6");'], stdout = subprocess.PIPE, stderr = subprocess.PIPE, universal_newlines = 1)
- (stdout, stderr) = p.communicate()
- if hasattr(open, 'newlines'):
- self.assertEqual(stdout, 'line1\nline2\nline3\nline4\nline5\nline6')
- else:
- self.assertEqual(stdout, 'line1\nline2\rline3\r\nline4\r\nline5\nline6')
-
-
- def test_no_leaking(self):
- if test_support.is_resource_enabled('subprocess') and not mswindows:
- max_handles = 1026
- else:
- max_handles = 65
- for i in range(max_handles):
- p = subprocess.Popen([
- sys.executable,
- '-c',
- 'import sys;sys.stdout.write(sys.stdin.read())'], stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
- data = p.communicate('lime')[0]
- self.assertEqual(data, 'lime')
-
-
-
- def test_list2cmdline(self):
- self.assertEqual(subprocess.list2cmdline([
- 'a b c',
- 'd',
- 'e']), '"a b c" d e')
- self.assertEqual(subprocess.list2cmdline([
- 'ab"c',
- '\\',
- 'd']), 'ab\\"c \\ d')
- self.assertEqual(subprocess.list2cmdline([
- 'a\\\\\\b',
- 'de fg',
- 'h']), 'a\\\\\\b "de fg" h')
- self.assertEqual(subprocess.list2cmdline([
- 'a\\"b',
- 'c',
- 'd']), 'a\\\\\\"b c d')
- self.assertEqual(subprocess.list2cmdline([
- 'a\\\\b c',
- 'd',
- 'e']), '"a\\\\b c" d e')
- self.assertEqual(subprocess.list2cmdline([
- 'a\\\\b\\ c',
- 'd',
- 'e']), '"a\\\\b\\ c" d e')
-
-
- def test_poll(self):
- p = subprocess.Popen([
- sys.executable,
- '-c',
- 'import time; time.sleep(1)'])
- count = 0
- while p.poll() is None:
- time.sleep(0.10000000000000001)
- count += 1
- self.assert_(count >= 2)
- self.assertEqual(p.poll(), 0)
-
-
- def test_wait(self):
- p = subprocess.Popen([
- sys.executable,
- '-c',
- 'import time; time.sleep(2)'])
- self.assertEqual(p.wait(), 0)
- self.assertEqual(p.wait(), 0)
-
-
- def test_invalid_bufsize(self):
-
- try:
- subprocess.Popen([
- sys.executable,
- '-c',
- 'pass'], 'orange')
- except TypeError:
- pass
-
- self.fail('Expected TypeError')
-
- if not mswindows:
-
- def test_exceptions(self):
-
- try:
- p = subprocess.Popen([
- sys.executable,
- '-c',
- ''], cwd = '/this/path/does/not/exist')
- except OSError:
- e = None
- self.assertNotEqual(e.child_traceback.find('os.chdir'), -1)
-
- self.fail('Expected OSError')
-
-
- def test_run_abort(self):
- p = subprocess.Popen([
- sys.executable,
- '-c',
- 'import os; os.abort()'])
- p.wait()
- self.assertEqual(-(p.returncode), signal.SIGABRT)
-
-
- def test_preexec(self):
- p = subprocess.Popen([
- sys.executable,
- '-c',
- 'import sys,os;sys.stdout.write(os.getenv("FRUIT"))'], stdout = subprocess.PIPE, preexec_fn = (lambda : os.putenv('FRUIT', 'apple')))
- self.assertEqual(p.stdout.read(), 'apple')
-
-
- def test_args_string(self):
- (f, fname) = self.mkstemp()
- os.write(f, '#!/bin/sh\n')
- os.write(f, "exec %s -c 'import sys; sys.exit(47)'\n" % sys.executable)
- os.close(f)
- os.chmod(fname, 448)
- p = subprocess.Popen(fname)
- p.wait()
- os.remove(fname)
- self.assertEqual(p.returncode, 47)
-
-
- def test_invalid_args(self):
- self.assertRaises(ValueError, subprocess.call, [
- sys.executable,
- '-c',
- 'import sys; sys.exit(47)'], startupinfo = 47)
- self.assertRaises(ValueError, subprocess.call, [
- sys.executable,
- '-c',
- 'import sys; sys.exit(47)'], creationflags = 47)
-
-
- def test_shell_sequence(self):
- newenv = os.environ.copy()
- newenv['FRUIT'] = 'apple'
- p = subprocess.Popen([
- 'echo $FRUIT'], shell = 1, stdout = subprocess.PIPE, env = newenv)
- self.assertEqual(p.stdout.read().strip(), 'apple')
-
-
- def test_shell_string(self):
- newenv = os.environ.copy()
- newenv['FRUIT'] = 'apple'
- p = subprocess.Popen('echo $FRUIT', shell = 1, stdout = subprocess.PIPE, env = newenv)
- self.assertEqual(p.stdout.read().strip(), 'apple')
-
-
- def test_call_string(self):
- (f, fname) = self.mkstemp()
- os.write(f, '#!/bin/sh\n')
- os.write(f, "exec %s -c 'import sys; sys.exit(47)'\n" % sys.executable)
- os.close(f)
- os.chmod(fname, 448)
- rc = subprocess.call(fname)
- os.remove(fname)
- self.assertEqual(rc, 47)
-
-
- if mswindows:
-
- def test_startupinfo(self):
- STARTF_USESHOWWINDOW = 1
- SW_MAXIMIZE = 3
- startupinfo = subprocess.STARTUPINFO()
- startupinfo.dwFlags = STARTF_USESHOWWINDOW
- startupinfo.wShowWindow = SW_MAXIMIZE
- subprocess.call([
- sys.executable,
- '-c',
- 'import sys; sys.exit(0)'], startupinfo = startupinfo)
-
-
- def test_creationflags(self):
- CREATE_NEW_CONSOLE = 16
- sys.stderr.write(' a DOS box should flash briefly ...\n')
- subprocess.call(sys.executable + ' -c "import time; time.sleep(0.25)"', creationflags = CREATE_NEW_CONSOLE)
-
-
- def test_invalid_args(self):
- self.assertRaises(ValueError, subprocess.call, [
- sys.executable,
- '-c',
- 'import sys; sys.exit(47)'], preexec_fn = (lambda : 1))
- self.assertRaises(ValueError, subprocess.call, [
- sys.executable,
- '-c',
- 'import sys; sys.exit(47)'], close_fds = True)
-
-
- def test_shell_sequence(self):
- newenv = os.environ.copy()
- newenv['FRUIT'] = 'physalis'
- p = subprocess.Popen([
- 'set'], shell = 1, stdout = subprocess.PIPE, env = newenv)
- self.assertNotEqual(p.stdout.read().find('physalis'), -1)
-
-
- def test_shell_string(self):
- newenv = os.environ.copy()
- newenv['FRUIT'] = 'physalis'
- p = subprocess.Popen('set', shell = 1, stdout = subprocess.PIPE, env = newenv)
- self.assertNotEqual(p.stdout.read().find('physalis'), -1)
-
-
- def test_call_string(self):
- rc = subprocess.call(sys.executable + ' -c "import sys; sys.exit(47)"')
- self.assertEqual(rc, 47)
-
-
-
-
- def test_main():
- test_support.run_unittest(ProcessTestCase)
-
- if __name__ == '__main__':
- test_main()
-
-